Celery 是分散式的工作佇列系統,它可以用來作什麼呢?舉幾個例子來說,會比較容易理解:
也就是長時間、需要重試等等的工作,都適合使用工作佇列系統來處理。
工作佇列系統主體有幾個部分:
運作時,會以訊息告知 broker 有工作要執行,訊息裡包含了工作所需的參數或內容,接著 broker 會把工作放到佇列裡。worker 會持續的監控佇列,在有工作時,就會拿出來處理。處理完成後,再把結果告知 broker 。
Celery 有以下優點:
那跟 Django 有什麼關係呢?網站處理 HTTP 請求,我們對 HTTP 請求的預期一般來說是越快越好,如果花太多時間,就會阻塞住網站的處理。所以,長時間的工作就會希望可以在背景執行,這個背景執行,就可以使用 Celery,把背景執行工作放到工作佇列裡,Celery worker 就會去執行。
專案網址:https://docs.celeryproject.org/en/stable/index.html
poetry add celery redis
sudo apt-get install rabbitmq-server
這裡也順便一起安裝 RabbitMQ 這個 Message Queue server 用來作為 Broker
# settings.py
import environ
env = environ.Env()
# ...
REDIS_HOST = env('REDIS_HOST', default='localhost')
REDIS_PORT = env('REDIS_PORT', default=6379)
RABBITMQ_HOST = env('RABBITMQ_HOST', default='localhost')
CELERY_RESULT_BACKEND = 'redis://{redis_host}:{redis_port}/1'.format(
redis_host=REDIS_HOST,
redis_port=REDIS_PORT)
CELERY_BROKER_URL = 'amqp://guest@{rabbitmq_host}//'.format(
rabbitmq_host=RABBITMQ_HOST)
CELERY_ENABLE_UTC = True
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# When using librabbitmq, must use PROTOCOL 1
# https://stackoverflow.com/questions/42081061/celery-rabbitmqwarning-mainprocess-received-and-deleted-unknown-message-wron/42561772
CELERY_TASK_PROTOCOL = 1
設定這邊,我們讀取環境變數裡的 REDIS_HOST, REDIS_PORT, RABBITMQ_HOST ,然後用 Redis 作為 Celery 的 Result backend ,Result backend 是用來存放結果的地方;用 RabbitMQ 作為 Celery 的 broker backend,這指的是訊息佇列的位置。
除了 result 與 broker 之外,我們也指定了 Celery 要用的時區、內容格式。最後一個 CELERY_TASK_PROTOCOL 則是自己所遇到的經驗,如果是使用 librabbitmq 的話,得設定為 1 ,否則 Celery 跟 RabbitMQ 溝通時會有警告訊息。
設定好以後,接下來就是怎麼使用了。
第一步,得先準備 Celery app,這是在執行 celery 指令,會去呼叫的應用程式起點。
# django_ithome_ironman/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_ithome_ironman.settings')
app = Celery('proj')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
注意最後一行的 app.autodiscover_tasks() ,這會自動去尋找程式裡有加上 task decorator 的函式。
第二步,修改 django_ithome_ironman/init.py ,import 我們剛剛加的 celery app
# django_ithome_ironman/__init__.py
from __future__ import absolute_import
from .celery import app as celery_app
__all__ = ['celery_app']
第三步,準備要執行的 Task,這裡我們定義一個簡單的 add,加上 shared_task decorator。
# news/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
第四步,開啟一個終端機視窗,這裡要先執行 celery worker
poetry run celery -A django_ithome_ironman worker -l info
最後,來呼叫我們寫的 add Task,再開啟一個終端機視窗,進入 shell:poetry run python manage.py shell
>>> from django_ithome_ironman.tasks import add
>>> x = add.apply_async(args=(3, 4,))
>>> print(f"x.result={x.result}")
這時就會在前面那個終端機視窗看到 add 函式被呼叫執行了。
這邊的 x 是一個 AsyncResult 類別的實體,必須要使用 x.result 才能取得函式執行的結果。
這次再試試看 countdown 這個參數,這個參數是指在指定秒數後才執行。
>>> z = add.apply_async(args=(4, 5, ), countdown=15)
>>> print(f"z.result={z.result}")
這時會發現 z.result 是空的,但經過 15秒以後再印一次,就有結果了。
關於 Celery 我們就介紹到這邊,但 Celery 不只有這些,還有許多功能值得挖掘,例如:
這裡因為篇幅關係,就此打住,上述範例的程式碼在 https://github.com/elleryq/ithome-iron-2020-django/tree/day-24